通过PySpark开发Spark应用 您所在的位置:网站首页 云原生数据仓库AnalyticDB MySQL 通过PySpark开发Spark应用

通过PySpark开发Spark应用

2024-06-12 14:11| 来源: 网络整理| 查看: 265

前提条件

已创建AnalyticDB MySQL湖仓版。具体操作,请参见创建集群。

已创建Job型资源组。具体操作,请参见新建资源组。

已创建数据库账号。

如果您是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建数据库账号。

如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。

已开通OSS服务,并创建存储空间。具体操作,请参见开通OSS服务和控制台创建存储空间。

PySpark的基本用法

编写如下示例程序,并将示例程序存储为example.py。

from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()

将example.py程序上传到OSS中。具体操作,请参见控制台上传文件。

进入Spark开发编辑器。

登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。

在左侧导航栏,单击作业开发>Spark Jar 开发

在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

在编辑器中执行以下作业内容。

{ "name": "Spark Python Test", "file": "oss://{your oss bucket path}/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }

参数说明请参见参数说明。

使用Python依赖使用方法

如果您使用自行开发或第三方开发的依赖开发Python程序时,需将使用的依赖上传至OSS中,并在提交Spark作业时配置pyFiles参数。

示例

本文示例以引入自定义函数计算员工的税后收入为例。示例将数据文件staff.csv上传至OSS中。staff.csv中的示例数据如下:

name,age,gender,salary Lucky,25,male,100 Lucy,23,female,150 Martin,30,male,180 Rose,31,female,200

开发依赖并上传至OSS中。

创建名为tools的文件夹,并在该文件夹下创建名为func.py的程序。

def tax(salary): """ convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)

将tools文件夹压缩后上传至OSS中。本文示例为tools.tar.gz。

说明

如果依赖多个Python文件,建议您使用gz压缩包进行压缩。您可以在Python代码中以module方式引用Python文件。

编写名为example.py的示例程序。

from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()

将example.py程序上传到OSS中。具体操作,请参见控制台上传文件。

进入Spark开发编辑器。

登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。

在左侧导航栏,单击作业开发>Spark Jar 开发

在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

在编辑器中执行以下作业内容。

{ "name": "Spark Python", "file": "oss:///example.py", "pyFiles": ["oss:///tools.tar.gz"], "args": [ "oss:///staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }

参数说明:

file:Python程序所在的OSS路径。

pyFiles:PySpark依赖的Python文件所在的OSS路径,后缀可以是tar或tar.gz。多个压缩包使用英文逗号(,)分隔。

说明

PySpark应用所依赖的所有Python文件必须存储在OSS中。

args:使用JAR包时需要使用的参数。本文为staff.csv示例数据所在的OSS路径。

更多参数,请参见参数说明。

使用Virtual Environments打包依赖环境

开发Python作业时,如果您遇到复杂的依赖环境,可以通过Python的Virtual Environments技术进行环境管理和隔离。AnalyticDB MySQL Spark支持使用Virtual Environments将本地依赖的环境打包并上传到OSS中。关于Virtual Environments的更多信息,请参见Python官方社区文档。

重要

AnalyticDB MySQL Spark使用的glibc-devel版本为2.28,若Virtual Environments不兼容2.28版本,PySpark任务可能无法正常执行。

使用方法

使用Virtual Environments打包Python环境,需将压缩包上传至OSS中,并在提交Spark作业时配置archives和spark.pyspark.python参数。

示例

准备Linux环境。

Virtual Environments需在Linux操作系统中打包Python环境,您可以通过以下三种方式准备Linux环境。本文以购买阿里云ECS实例为例。

购买操作系统为Centos 7或AnolisOS 8的阿里云ECS实例。具体操作,请参见自定义购买实例。

在本地安装Centos 7或者AnolisOS 8以上版本的操作系统。

使用Centos或AnolisOS的官方Docker镜像,在镜像内部打包Python环境。

使用Virtual Environments打包Python运行环境,并将压缩包上传至OSS中。

使用Virtualenv或Conda打包项目依赖的Python环境,打包时可自定义Python的版本。此处以Virtualenv打包为例。

# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv说明

如果您想通过Conda打包项目依赖,请参见Conda管理虚拟环境。

进入Spark开发编辑器。

登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。

在左侧导航栏,单击作业开发>Spark Jar 开发

在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

在编辑器中执行以下作业内容。

{ "name": "venv example", "archives": [ "oss:///venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss:///example.py" }

参数说明:

archives:Python环境压缩包所在的OSS路径。本文示例为venv.tar.gz压缩包所在的OSS路径。

spark.pyspark.python:指定要使用的Python解释器的本地路径。

更多参数,请参见参数说明。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有